feat: [iceberg] Native scan by serializing FileScanTasks to iceberg-rust#2528
feat: [iceberg] Native scan by serializing FileScanTasks to iceberg-rust#2528mbutrovich merged 154 commits intoapache:mainfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2528 +/- ##
============================================
+ Coverage 56.12% 59.05% +2.92%
- Complexity 976 1462 +486
============================================
Files 119 165 +46
Lines 11743 15060 +3317
Branches 2251 2504 +253
============================================
+ Hits 6591 8893 +2302
- Misses 4012 4899 +887
- Partials 1140 1268 +128 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
It is promising! |
227332c to
6966a12
Compare
# Conflicts: # native/Cargo.lock # spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
…eberg version back to 1.8.1 after hitting known segfaults with old versions.
## Which issue does this PR close? - Part of #1749. ## What changes are included in this PR? - Change `ArrowReaderBuilder::new` to be `pub` instead of `pub(crate)`. ## Are these changes tested? - No new tests for this. Currently being used in DataFusion Comet: apache/datafusion-comet#2528
# Conflicts: # docs/source/user-guide/latest/configs.md # native/Cargo.lock # native/Cargo.toml # native/core/Cargo.toml
andygrove
left a comment
There was a problem hiding this comment.
I don't see any reason not to go ahead and merge this as an experimental feature.
…llel test and setup workflows.
spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
Outdated
Show resolved
Hide resolved
…due to type limitations in Iceberg 1.5.2.
kazuyukitanimura
left a comment
There was a problem hiding this comment.
I know this is merged, but one more comment
| iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full: '1.9.1'}, {short: '1.10', full: '1.10.0'}] | ||
| spark-version: [{short: '3.4', full: '3.4.3'}, {short: '3.5', full: '3.5.7'}] |
There was a problem hiding this comment.
The profile now says to use Iceberg 1.5 with Spark 3.4, but we do not have 1.5 here. Not sure if it causes problems...
There was a problem hiding this comment.
Here's what we currently test with this PR:
| 3.4 | 3.5 | 4.0 | |
|---|---|---|---|
| 1.5.2 | CometIcebergNativeSuite CometFuzzIcebergSuite IcebergReadFromS3Suite (not run in CI due to MinIO container) | ||
| 1.8.1 | Iceberg Spark Tests Iceberg Spark Extensions Tests Iceberg Spark Runtime Tests | Iceberg Spark Tests Iceberg Spark Extensions Tests Iceberg Spark Runtime Tests CometIcebergNativeSuite CometFuzzIcebergSuite IcebergReadFromS3Suite (not run in CI due to MinIO container) | |
| 1.9.1 | Iceberg Spark Tests Iceberg Spark Extensions Tests Iceberg Spark Runtime Tests | Iceberg Spark Tests Iceberg Spark Extensions Tests Iceberg Spark Runtime Tests | |
| 1.10 | Iceberg Spark Tests Iceberg Spark Extensions Tests Iceberg Spark Runtime Tests | Iceberg Spark Tests Iceberg Spark Extensions Tests Iceberg Spark Runtime Tests | CometIcebergNativeSuite CometFuzzIcebergSuite IcebergReadFromS3Suite (not run in CI due to MinIO container) |
I leaned on newer versions for the Iceberg tests because as best as I could tell, never versions are a superset of the older versions. For the Comet-native tests we are running 1.5.2.
We should have a discussion of what we want to run long term, because right now tagging a PR [iceberg] makes CI take hours and causes so many parallel Iceberg suites that we start getting network timeouts (likely due to throttling).
…chTransformer (apache#1821) Partially address apache#1749. This PR adds partition spec handling to `FileScanTask` and `RecordBatchTransformer` to correctly implement the Iceberg spec's "Column Projection" rules for fields "not present" in data files. Prior to this PR, `iceberg-rust`'s `FileScanTask` had no mechanism to pass partition information to `RecordBatchTransformer`, causing two issues: 1. **Incorrect handling of bucket partitioning**: Couldn't distinguish identity transforms (which should use partition metadata constants) from non-identity transforms like bucket/truncate/year/month (which must read from data file). For example, `bucket(4, id)` stores `id_bucket = 2` (bucket number) in partition metadata, but actual `id` values (100, 200, 300) are only in the data file. iceberg-rust was incorrectly treating bucket-partitioned source columns as constants, breaking runtime filtering and returning incorrect query results. 2. **Field ID conflicts in add_files scenarios**: When importing Hive tables via `add_files`, partition columns could have field IDs conflicting with Parquet data columns. Example: Parquet has field_id=1→"name", but Iceberg expects field_id=1→"id" (partition). Per spec, the correct field is "not present" and requires name mapping fallback. Per the Iceberg spec (https://iceberg.apache.org/spec/#column-projection), when a field ID is "not present" in a data file, it must be resolved using these rules: 1. Return the value from partition metadata if an **Identity Transform** exists 2. Use `schema.name-mapping.default` metadata to map field id to columns without field id 3. Return the default value if it has a defined `initial-default` 4. Return null in all other cases **Why this matters:** - **Identity transforms** (e.g., `identity(dept)`) store actual column values in partition metadata that can be used as constants without reading the data file - **Non-identity transforms** (e.g., `bucket(4, id)`, `day(timestamp)`) store transformed values in partition metadata (e.g., bucket number 2, not the actual `id` values 100, 200, 300) and must read source columns from the data file 1. **Added partition fields to `FileScanTask`** (`scan/task.rs`): - `partition: Option<Struct>` - Partition data from manifest entry - `partition_spec: Option<Arc<PartitionSpec>>` - For transform-aware constant detection - `name_mapping: Option<Arc<NameMapping>>` - Name mapping from table metadata 2. **Implemented `constants_map()` function** (`arrow/record_batch_transformer.rs`): - Replicates Java's `PartitionUtil.constantsMap()` behavior - Only includes fields where transform is `Transform::Identity` - Used to determine which fields use partition metadata constants vs. reading from data files 3. **Enhanced `RecordBatchTransformer`** (`arrow/record_batch_transformer.rs`): - Added `build_with_partition_data()` method to accept partition spec, partition data, and name mapping - Implements all 4 spec rules for column resolution with identity-transform awareness - Detects field ID conflicts by verifying both field ID AND name match - Falls back to name mapping when field IDs are missing/conflicting (spec rule risingwavelabs#2) 4. **Updated `ArrowReader`** (`arrow/reader.rs`): - Uses `build_with_partition_data()` when partition information is available - Falls back to `build()` when not available 5. **Updated manifest entry processing** (`scan/context.rs`): - Populates partition fields in `FileScanTask` from manifest entry data 1. **`bucket_partitioning_reads_source_column_from_file`** - Verifies that bucket-partitioned source columns are read from data files (not treated as constants from partition metadata) 2. **`identity_partition_uses_constant_from_metadata`** - Verifies that identity-transformed fields correctly use partition metadata constants 3. **`test_bucket_partitioning_with_renamed_source_column`** - Verifies field-ID-based mapping works despite column rename 4. **`add_files_partition_columns_without_field_ids`** - Verifies name mapping resolution for Hive table imports without field IDs (spec rule 5. **`add_files_with_true_field_id_conflict`** - Verifies correct field ID conflict detection with name mapping fallback (spec rule risingwavelabs#2) 6. **`test_all_four_spec_rules`** - Integration test verifying all 4 spec rules work together Yes, there are 6 new unit tests covering all 4 Iceberg spec rules. This also resolved approximately 50 Iceberg Java tests when running with DataFusion Comet's experimental apache/datafusion-comet#2528 PR. --------- Co-authored-by: Renjie Liu <liurenjie2008@gmail.com>
…chTransformer (apache#1821) (#107) Partially address apache#1749. This PR adds partition spec handling to `FileScanTask` and `RecordBatchTransformer` to correctly implement the Iceberg spec's "Column Projection" rules for fields "not present" in data files. Prior to this PR, `iceberg-rust`'s `FileScanTask` had no mechanism to pass partition information to `RecordBatchTransformer`, causing two issues: 1. **Incorrect handling of bucket partitioning**: Couldn't distinguish identity transforms (which should use partition metadata constants) from non-identity transforms like bucket/truncate/year/month (which must read from data file). For example, `bucket(4, id)` stores `id_bucket = 2` (bucket number) in partition metadata, but actual `id` values (100, 200, 300) are only in the data file. iceberg-rust was incorrectly treating bucket-partitioned source columns as constants, breaking runtime filtering and returning incorrect query results. 2. **Field ID conflicts in add_files scenarios**: When importing Hive tables via `add_files`, partition columns could have field IDs conflicting with Parquet data columns. Example: Parquet has field_id=1→"name", but Iceberg expects field_id=1→"id" (partition). Per spec, the correct field is "not present" and requires name mapping fallback. Per the Iceberg spec (https://iceberg.apache.org/spec/#column-projection), when a field ID is "not present" in a data file, it must be resolved using these rules: 1. Return the value from partition metadata if an **Identity Transform** exists 2. Use `schema.name-mapping.default` metadata to map field id to columns without field id 3. Return the default value if it has a defined `initial-default` 4. Return null in all other cases **Why this matters:** - **Identity transforms** (e.g., `identity(dept)`) store actual column values in partition metadata that can be used as constants without reading the data file - **Non-identity transforms** (e.g., `bucket(4, id)`, `day(timestamp)`) store transformed values in partition metadata (e.g., bucket number 2, not the actual `id` values 100, 200, 300) and must read source columns from the data file 1. **Added partition fields to `FileScanTask`** (`scan/task.rs`): - `partition: Option<Struct>` - Partition data from manifest entry - `partition_spec: Option<Arc<PartitionSpec>>` - For transform-aware constant detection - `name_mapping: Option<Arc<NameMapping>>` - Name mapping from table metadata 2. **Implemented `constants_map()` function** (`arrow/record_batch_transformer.rs`): - Replicates Java's `PartitionUtil.constantsMap()` behavior - Only includes fields where transform is `Transform::Identity` - Used to determine which fields use partition metadata constants vs. reading from data files 3. **Enhanced `RecordBatchTransformer`** (`arrow/record_batch_transformer.rs`): - Added `build_with_partition_data()` method to accept partition spec, partition data, and name mapping - Implements all 4 spec rules for column resolution with identity-transform awareness - Detects field ID conflicts by verifying both field ID AND name match - Falls back to name mapping when field IDs are missing/conflicting (spec rule #2) 4. **Updated `ArrowReader`** (`arrow/reader.rs`): - Uses `build_with_partition_data()` when partition information is available - Falls back to `build()` when not available 5. **Updated manifest entry processing** (`scan/context.rs`): - Populates partition fields in `FileScanTask` from manifest entry data 1. **`bucket_partitioning_reads_source_column_from_file`** - Verifies that bucket-partitioned source columns are read from data files (not treated as constants from partition metadata) 2. **`identity_partition_uses_constant_from_metadata`** - Verifies that identity-transformed fields correctly use partition metadata constants 3. **`test_bucket_partitioning_with_renamed_source_column`** - Verifies field-ID-based mapping works despite column rename 4. **`add_files_partition_columns_without_field_ids`** - Verifies name mapping resolution for Hive table imports without field IDs (spec rule 5. **`add_files_with_true_field_id_conflict`** - Verifies correct field ID conflict detection with name mapping fallback (spec rule #2) 6. **`test_all_four_spec_rules`** - Integration test verifying all 4 spec rules work together Yes, there are 6 new unit tests covering all 4 Iceberg spec rules. This also resolved approximately 50 Iceberg Java tests when running with DataFusion Comet's experimental apache/datafusion-comet#2528 PR. --------- Co-authored-by: Matt Butrovich <mbutrovich@users.noreply.github.com> Co-authored-by: Renjie Liu <liurenjie2008@gmail.com>
|
Hey @mbutrovich, thanks for the commit! This is super impactful :) I'm having some issues when using a RestCatalog - Using comet .12, iceberg 1.10.0, spark 3.5.6 with scala 2.13. Did you only test this for the hadoop catalog? Or did you try other types as well? |
|
Hi @jordepic! Thanks for testing this out!
@hsiang-c tested this a REST catalog and in theory we have a test that exercises this as well after #2895. I'm wondering if the jars aren't all getting loaded when used with Jupyter notebooks? I'm not as familiar with this scenario. Would you mind opening a new issue and we can track discussion there? Edit: I just realized you said 0.12.0. Unfortunately, the REST catalog support came after the 0.12.0 release, so you might have to wait for 0.13.0 or build a Comet jar from source. |
|
Yes, we need the upcoming release for REST Catalog support or build the JAR by yourself. |
This PR introduces a new approach for integrating Apache Iceberg with Comet using iceberg-rust, enabling fully-native Iceberg table scans without requiring changes to upstream Iceberg Java code.
Rationale for this change
I was inspired by @RussellSpitzer's recent talk and wanted to revisit the abstraction layer at which Comet integrates with Iceberg.
Our current
iceberg_compatapproach requires code changes in Iceberg Java to integrate with Parquet reader instantiation, creating a tight coupling between Comet and Iceberg. This PR instead works at theFileScanTasklayer after Iceberg's planning phase is complete. This enables fully-native Iceberg scans (similar to ournative_datafusionscans) without any changes in upstream Iceberg Java code.All catalog access and planning continues to happen through Spark's Iceberg integration (unchanged), but file reading is delegated to iceberg-rust, which provides better parallelism and integrates naturally with Comet's native execution engine.
What changes are included in this PR?
This implementation follows a similar pattern to
CometNativeScanExecfor regular Parquet files, but extracts and serializes Iceberg'sFileScanTaskobjects:Scala/JVM Side:
CometIcebergNativeScanExecoperator that replaces Spark's IcebergBatchScanExecFileScanTaskobjects from Iceberg's planning outputNative/Rust Side:
IcebergScanExecoperator that consumes serializedFileScanTaskobjectsFileIOandArrowReaderto read data filesHow are these changes tested?
CometIcebergNativeSuitewith basic scenarios, but also a number of challenging situations from the Iceberg Java test suiteCometFuzzIcebergSuitethat we can adapt to Iceberg-specific logicIcebergReadFromS3Suiteto test passing basic S3 credentialsBenefits over
iceberg_compatnative_datafusion, not constrained by Iceberg Java's reader designArrowReaderCurrent Limitations & Open Questions
ArrowReaderOptionsto benefit from previous work in Arrow-rs Support different TimeUnits and timezones when reading Timestamps from INT96 arrow-rs#7285iceberg_compatcode and its Iceberg Java entanglementRelated Work
Slides from the 10/9/25 Iceberg-Rust community call: iceberg-rust.pdf